/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.fnexecution.environment;



import com.bff.gaia.unified.fn.harness.FnHarness;

import com.bff.gaia.unified.model.pipeline.v1.Endpoints.ApiServiceDescriptor;

import com.bff.gaia.unified.model.pipeline.v1.RunnerApi.Environment;

import com.bff.gaia.unified.runners.fnexecution.GrpcFnServer;

import com.bff.gaia.unified.runners.fnexecution.InProcessServerFactory;

import com.bff.gaia.unified.runners.fnexecution.ServerFactory;

import com.bff.gaia.unified.runners.fnexecution.artifact.ArtifactRetrievalService;

import com.bff.gaia.unified.runners.fnexecution.control.ControlClientPool;

import com.bff.gaia.unified.runners.fnexecution.control.FnApiControlClientPoolService;

import com.bff.gaia.unified.runners.fnexecution.control.InstructionRequestHandler;

import com.bff.gaia.unified.runners.fnexecution.logging.GrpcLoggingService;

import com.bff.gaia.unified.runners.fnexecution.provisioning.StaticGrpcProvisionService;

import com.bff.gaia.unified.sdk.fn.IdGenerator;

import com.bff.gaia.unified.sdk.fn.stream.OutboundObserverFactory;

import com.bff.gaia.unified.sdk.fn.test.InProcessManagedChannelFactory;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.time.Duration;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} which is executing in the

 * same process.

 */

public class EmbeddedEnvironmentFactory implements EnvironmentFactory {

  private static final Logger LOG = LoggerFactory.getLogger(EmbeddedEnvironmentFactory.class);



  private final PipelineOptions options;



  private final GrpcFnServer<GrpcLoggingService> loggingServer;

  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;



  private final ControlClientPool.Source clientSource;



  public static EnvironmentFactory create(

      PipelineOptions options,

      GrpcFnServer<GrpcLoggingService> loggingServer,

      GrpcFnServer<FnApiControlClientPoolService> controlServer,

      ControlClientPool.Source clientSource) {

    return new EmbeddedEnvironmentFactory(options, loggingServer, controlServer, clientSource);

  }



  private EmbeddedEnvironmentFactory(

      PipelineOptions options,

      GrpcFnServer<GrpcLoggingService> loggingServer,

      GrpcFnServer<FnApiControlClientPoolService> controlServer,

      ControlClientPool.Source clientSource) {

    this.options = options;

    this.loggingServer = loggingServer;

    this.controlServer = controlServer;

    checkArgument(

        loggingServer.getApiServiceDescriptor() != null,

        "Logging Server cannot have a null %s",

        ApiServiceDescriptor.class.getSimpleName());

    checkArgument(

        controlServer.getApiServiceDescriptor() != null,

        "Control Server cannot have a null %s",

        ApiServiceDescriptor.class.getSimpleName());

    this.clientSource = clientSource;

  }



  @Override

  @SuppressWarnings("FutureReturnValueIgnored") // no need to monitor shutdown thread

  public RemoteEnvironment createEnvironment(Environment environment) throws Exception {

    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<?> fnHarness =

        executor.submit(

            () -> {

              try {

                FnHarness.main(

                    "id",

                    options,

                    loggingServer.getApiServiceDescriptor(),

                    controlServer.getApiServiceDescriptor(),

                    InProcessManagedChannelFactory.create(),

                    OutboundObserverFactory.clientDirect());

              } catch (NoClassDefFoundError e) {

                // TODO: https://issues.apache.org/jira/browse/BEAM-4384 load the FnHarness in a

                // Restricted classpath that we control for any user.

                LOG.error(

                    "{} while executing an in-process FnHarness. "

                        + "To use the {}, "

                        + "the 'com.bff.gaia.unified:unified-sdks-java-harness' artifact "

                        + "and its dependencies must be on the classpath",

                    NoClassDefFoundError.class.getSimpleName(),

                    EmbeddedEnvironmentFactory.class.getSimpleName(),

                    e);

                throw e;

              }

              return null;

            });

    executor.submit(

        () -> {

          try {

            fnHarness.get();

          } catch (Throwable t) {

            executor.shutdownNow();

          }

        });



    // TODO: find some way to populate the actual ID in FnHarness.main()

    InstructionRequestHandler handler = clientSource.take("", Duration.ofMinutes(1L));

    return RemoteEnvironment.forHandler(environment, handler);

  }



  /** Provider of EmbeddedEnvironmentFactory. */

  public static class Provider implements EnvironmentFactory.Provider {



    private final PipelineOptions pipelineOptions;



    public Provider(PipelineOptions pipelineOptions) {

      this.pipelineOptions = pipelineOptions;

    }



    @Override

    public EnvironmentFactory createEnvironmentFactory(

        GrpcFnServer<FnApiControlClientPoolService> controlServer,

        GrpcFnServer<GrpcLoggingService> loggingServer,

        GrpcFnServer<ArtifactRetrievalService> retrievalServer,

        GrpcFnServer<StaticGrpcProvisionService> provisioningServer,

        ControlClientPool clientPool,

        IdGenerator idGenerator) {

      return EmbeddedEnvironmentFactory.create(

          pipelineOptions, loggingServer, controlServer, clientPool.getSource());

    }



    @Override

    public ServerFactory getServerFactory() {

      return InProcessServerFactory.create();

    }

  }

}